{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "83acd0be",
   "metadata": {},
   "source": [
    "Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "<!--\n",
    "    Licensed to the Apache Software Foundation (ASF) under one\n",
    "    or more contributor license agreements.  See the NOTICE file\n",
    "    distributed with this work for additional information\n",
    "    regarding copyright ownership.  The ASF licenses this file\n",
    "    to you under the Apache License, Version 2.0 (the\n",
    "    \"License\"); you may not use this file except in compliance\n",
    "    with the License.  You may obtain a copy of the License at\n",
    "\n",
    "      http://www.apache.org/licenses/LICENSE-2.0\n",
    "\n",
    "    Unless required by applicable law or agreed to in writing,\n",
    "    software distributed under the License is distributed on an\n",
    "    \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
    "    KIND, either express or implied.  See the License for the\n",
    "    specific language governing permissions and limitations\n",
    "    under the License.\n",
    "-->\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5022179a",
   "metadata": {},
   "source": [
    "# Run Beam SQL in notebooks\n",
    "\n",
    "[Beam SQL](https://beam.apache.org/documentation/dsls/sql/overview/) allows a Beam user to query PCollections with SQL statements. Currently, `InteractiveRunner` does not support `SqlTransform` due to [BEAM-10708](https://issues.apache.org/jira/browse/BEAM-10708). However, a user could use the `beam_sql` magic to run Beam SQL in the notebook and introspect the result.\n",
    "\n",
    "`beam_sql` is an IPython [custom magic](https://ipython.readthedocs.io/en/stable/config/custommagics.html). If you're not familiar with magics, here are some [built-in examples](https://ipython.readthedocs.io/en/stable/interactive/magics.html). It's a convenient way to validate your queries locally against known/test data sources when prototyping a Beam pipeline with SQL, before productionizing it on remote cluster/services.\n",
    "\n",
    "First, let's load the `beam_sql` magic:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c6b6e3c1",
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext apache_beam.runners.interactive.sql.beam_sql_magics"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a7c43b84",
   "metadata": {},
   "source": [
    "Since SQL support in Beam Python SDK is implemented through xLang external transform, make sure you have below prerequisites:\n",
    "- Have `docker` installed;\n",
    "- Have jdk8 or jdk11 installed and $JAVA_HOME set;"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b280710a",
   "metadata": {},
   "outputs": [],
   "source": [
    "!docker image list\n",
    "!java --version\n",
    "!echo $JAVA_HOME"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "28b1b320",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Optionally sets the logging level to reduce distraction.\n",
    "import logging\n",
    "\n",
    "logging.root.setLevel(logging.ERROR)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f6b8789f",
   "metadata": {},
   "source": [
    "**Important**: if you're using Beam built from your local source code, additionally:\n",
    "\n",
    "- Have the Java expansion service shadowjar built. Go to the root directory of your local beam repo and then execute:\n",
    "  `./gradlew :sdks:java:extensions:sql:expansion-service:shadowJar`.\n",
    "- Based on your jdk version, pull the docker image `docker pull apache/beam_java11_sdk` or java17, java21.\n",
    "- Then tag the image with your current Beam dev version.  You can check the dev version under `apache_beam.version.__version__`. For example, if you're using jdk11 and dev version is `x.x.x.dev`, execute `docker image tag apache/beam_java11_sdk:latest apache/beam_java11_sdk:x.x.x.dev`."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "14c8967d",
   "metadata": {},
   "source": [
    "## Query#1 - A simple static query\n",
    "\n",
    "The `beam_sql` magic can be used as either a line magic or a cell magic.\n",
    "You can check its usage by running:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c212dd89",
   "metadata": {},
   "outputs": [],
   "source": [
    "%beam_sql -h"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7914c1aa",
   "metadata": {},
   "source": [
    "You can run a simple SQL query (in Apache Calcite SQL [syntax](https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/)) to create a [schema-aware PCollection](https://beam.apache.org/documentation/programming-guide/#schemas) from static values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "895341fa",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%beam_sql -o query1_data\n",
    "SELECT CAST(5 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c394ead5",
   "metadata": {},
   "source": [
    "The `beam_sql` magic shows you the result of the SQL query.\n",
    "\n",
    "It also creates and outputs a PCollection named `query1_data` with `element_type` like `BeamSchema_...(id: int32, str: str)`.\n",
    "\n",
    "Note that you have **not** explicitly created a Beam pipeline. You get a PCollection because the `beam_sql` magic always **implicitly creates** a pipeline to execute your SQL query. To hold the elements with each field's type info, Beam automatically creates a schema as the `element_type` for the created PCollection."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "981b2cc9",
   "metadata": {},
   "source": [
    "To introspect the data again with more knobs, you can use `show`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e97caf83",
   "metadata": {},
   "outputs": [],
   "source": [
    "from apache_beam.runners.interactive import interactive_beam as ib\n",
    "ib.show(query1_data)\n",
    "# Uncomment below to set more args.\n",
    "# ib.show(query1_data, visualize_data=True, include_window_info=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f58b15a8",
   "metadata": {},
   "source": [
    "To materialize the PCollection into a pandas [DataFrame](https://pandas.pydata.org/pandas-docs/stable/user_guide/dsintro.html#dataframe) object, you can use `collect`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "47b8da1a",
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.collect(query1_data)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "09b4f24c",
   "metadata": {},
   "source": [
    "You can also additionally append some transforms such as writing to a text file and print the elements:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9a650bbb",
   "metadata": {},
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "\n",
    "coder=beam.coders.registry.get_coder(query1_data.element_type)\n",
    "print(coder)\n",
    "query1_data | beam.io.textio.WriteToText('/tmp/query1_data', coder=coder)\n",
    "query1_data | beam.Map(print)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6cf89704",
   "metadata": {},
   "source": [
    "Execute the pipeline as a normal pipeline running on DirectRunner and inspect the output file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d524e1a0",
   "metadata": {},
   "outputs": [],
   "source": [
    "!rm -rf /tmp/query1_data*\n",
    "query1_data.pipeline.run().wait_until_finish()\n",
    "!ls /tmp/query1_data*\n",
    "!cat /tmp/query1_data*"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5600945a",
   "metadata": {},
   "source": [
    "The coder in use is a `RowCoder`. The element is encoded and written to the text file. When inspecting it directly, it may display garbled strings. The file will be revisited later in Query#4."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "30aa1188",
   "metadata": {},
   "source": [
    "### [Optional] Omit the `-o` option.\n",
    "If the option is omitted, an output name is auto-generated based on the SQL query and PCollection (if any) it queries. Optionally, you can also use the `_[{execution_count}]` convention: `_` for last output and `_{execution_count}` for a specific cell execution output.\n",
    "\n",
    "However, explicitly naming the output is recommended for better notebook readability and to avoid unexpected errors.\n",
    "\n",
    "Below example outputs a PCollection named like `sql_output_...`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b445e4f1",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%beam_sql\n",
    "SELECT CAST(1 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c7b9e4fb",
   "metadata": {},
   "source": [
    "Now that you are familiar with the `beam_sql` magic, you can build more queries against PCollections.\n",
    "\n",
    "Let's install the `names` package to randomly generate some names."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ef1ca0fc",
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install names"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1c0d5739",
   "metadata": {},
   "source": [
    "Import all modules needed for this example."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "20cdf3b9",
   "metadata": {},
   "outputs": [],
   "source": [
    "import names\n",
    "import typing\n",
    "\n",
    "import apache_beam as beam\n",
    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n",
    "from apache_beam.runners.interactive import interactive_beam as ib"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "00db1574",
   "metadata": {},
   "source": [
    "Create a pipeline `p` with the `InteractiveRunner`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "24caeb60",
   "metadata": {},
   "outputs": [],
   "source": [
    "p = beam.Pipeline(InteractiveRunner())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0a4ca6eb",
   "metadata": {},
   "source": [
    "Then let's create a schema with `typing.NamedTuple`. Let's call it `Person` with a field `id` and a field `name`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "23910a9d",
   "metadata": {},
   "outputs": [],
   "source": [
    "class Person(typing.NamedTuple):\n",
    "    id: int\n",
    "    name: str"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5c626d63",
   "metadata": {},
   "source": [
    "With `beam_sql` magic, you can utilize all the Beam I/O connectors (streaming is currently not supported due to `DirectRunner` not supporting streaming pipeline with `SqlTransform`) as source of data, then build a SQL query against all the data and check the output. If needed, you can sink the output following the `WriteToText` example demonstrated above."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2d892920",
   "metadata": {},
   "source": [
    "## Query#2 - Querying a single PCollection\n",
    "\n",
    "Let's build a PCollection with 10 random `Person` typed elements."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8a5fc9b9",
   "metadata": {},
   "outputs": [],
   "source": [
    "persons = (p \n",
    "           | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(10)]))\n",
    "ib.show(persons)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "84d64746",
   "metadata": {},
   "source": [
    "You can look for all elements with `id < 5` in `persons` with the below query and assign the output to `persons_id_lt_5`. Also, you can enable `-v` option to see more details about the execution."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "07db1116",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%beam_sql -o persons_id_lt_5 -v\n",
    "SELECT * FROM persons WHERE id <5"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "68afa962",
   "metadata": {},
   "source": [
    "With `-v`, if it's the first time running this query, you might see a warning message about\n",
    "\n",
    "```\n",
    "Schema Person has not been registered to use a RowCoder. Automatically registering it by running: beam.coders.registry.register_coder(Person, beam.coders.RowCoder)\n",
    "```\n",
    "\n",
    "The `beam_sql` magic helps registering a `RowCoder` for each schema you define and use whenever it finds one. You can also explicitly run the same code to do so.\n",
    "\n",
    "Note the output element type is `Person(id: int, name: str)` instead of `BeamSchema_...` because you have selected all the fields from a single PCollection of the known type `Person(id: int, name: str)`."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "79587515",
   "metadata": {},
   "source": [
    "## Query#3 - Joining multiple PCollections\n",
    "\n",
    "You can build a `persons_2` PCollection with a different range of `id`s and `name`s. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c01fa39a",
   "metadata": {},
   "outputs": [],
   "source": [
    "persons_2 = (p \n",
    "             | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(5, 15)]))\n",
    "ib.show(persons_2)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6904ff8e",
   "metadata": {},
   "source": [
    "Then query for all `name`s from `persons` and `persons_2` with the same `id`s and assign the output to `persons_with_common_id`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2a0a60ff",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%beam_sql -o persons_with_common_id -v\n",
    "SELECT * FROM persons JOIN persons_2 USING (id)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4bb4df8a",
   "metadata": {},
   "source": [
    "Note the output element type is now some `BeamSchema_...(id: int64, name: str, name0: str)`. Because you have selected columns from both PCollections, there is no known schema to hold the result. Beam automatically creates a schema and differentiates conflicted field `name` by suffixing `0` to one of them.\n",
    "\n",
    "And since `Person` is already previously registered with a `RowCoder`, there is no more warning about registering it anymore even with `-v`."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cfcfeb76",
   "metadata": {},
   "source": [
    "## Query#4 - Join multiple PCollections, including I/O."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ce8abc3d",
   "metadata": {},
   "source": [
    "Let's read the file written by Query#1 and use it to join `persons` and `persons_2` to find `name`s with the common `id` in all three of them. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d1dea37b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use the exact same coder used when WriteToText and explicitly set the output types.\n",
    "query1_result_in_file = p | beam.io.ReadFromText(\n",
    "    '/tmp/query1_data*', coder=coder).with_output_types(\n",
    "    query1_data.element_type)\n",
    "\n",
    "# Check all the data sources.\n",
    "ib.show(query1_result_in_file)\n",
    "ib.show(persons)\n",
    "ib.show(persons_2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4bf6c422",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%beam_sql -o entry_with_common_id\n",
    "\n",
    "SELECT query1_result_in_file.id, persons.name AS `name_1`, persons_2.name AS `name_2`\n",
    "FROM query1_result_in_file JOIN persons ON query1_result_in_file.id = persons.id\n",
    "JOIN persons_2 ON query1_result_in_file.id = persons_2.id"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "282f6173",
   "metadata": {},
   "source": [
    "You can also chain another `beam_sql` magic to get just `name_1`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d858dd6c",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%beam_sql -o name_found\n",
    "SELECT name_1 AS `name` FROM entry_with_common_id"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
